/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.core.handler;

import com.chinamobile.cmss.lakehouse.common.exception.BaseException;
import com.chinamobile.cmss.lakehouse.common.kubernetes.K8sModelConstant;
import com.chinamobile.cmss.lakehouse.core.client.IKubernetesClient;
import com.chinamobile.cmss.lakehouse.core.client.InformerException;
import com.chinamobile.cmss.lakehouse.core.client.impl.InformerClientImpl;

import java.util.ArrayList;
import java.util.List;

import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1ObjectMetaBuilder;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceList;
import io.kubernetes.client.openapi.models.V1ServicePort;
import io.kubernetes.client.openapi.models.V1ServiceSpec;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Util for discovering available node ports in k8s
 */
@Slf4j
@Component
public class NodePortHandler {

    @Autowired
    protected IKubernetesClient kubernetesClient;

    @Autowired
    private InformerClientImpl informerClient;

    public List<Integer> acquireNodePorts(String namespace, int ports) {
        V1Service discoverService = mockDiscoverNodePortService(ports);
        try {
            V1Service resultService = informerClient.createService(namespace, discoverService);
            List<Integer> availablePorts = getNodePorts(resultService.getMetadata().getNamespace(),
                resultService.getMetadata().getName());
            log.info("availablePorts {} ", availablePorts.toString());
            if (ports != availablePorts.size()) {
                log.error("acquired ports doesn't match needed ports, need {} ports but found {} ports",
                    ports, availablePorts.size());
                throw new BaseException("ports doesn't match");
            }
            return availablePorts;
        } catch (ApiException e) {
            log.error("acquire node ports error {} ", e.getResponseBody(), e);
            throw new RuntimeException(e);
        } catch (InformerException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (kubernetesClient.existService(namespace, discoverService.getMetadata().getName())) {
                    informerClient.deleteService(namespace, discoverService.getMetadata().getName());
                }
            } catch (ApiException | InformerException e) {
                // ignore
            }
        }
    }

    private List<Integer> getNodePorts(String namespace, String serviceName) throws ApiException {
        V1ServiceList serviceList = kubernetesClient.listService(namespace);
        List<Integer> ports = new ArrayList<>();
        serviceList.getItems().stream().filter(s -> serviceName.equals(s.getMetadata().getName()))
            .forEach(
                service -> service.getSpec().getPorts().forEach(port -> ports.add(port.getNodePort())));
        return ports;
    }

    private V1Service mockDiscoverNodePortService(int ports) {
        V1Service discoverService =
            new V1Service().apiVersion(K8sModelConstant.API_VERSION).kind(K8sModelConstant.SERVICE_KIND)
                .metadata(new V1ObjectMetaBuilder().withName(K8sModelConstant.SERVICE_NAME_DISCOVER)
                    .withLabels(K8sModelConstant.DISCOVER_SERVICE_LABELS).build());
        discoverService.setSpec(new V1ServiceSpec().ports(mockServicePorts(ports))
            .selector(K8sModelConstant.DISCOVER_SERVICE_LABELS).type(K8sModelConstant.NODE_PORT));
        return discoverService;
    }

    private List<V1ServicePort> mockServicePorts(int ports) {
        List<V1ServicePort> discoverFakePorts = new ArrayList<>(ports);
        String discoverPort = "mock-discover-port";
        int start = 26000;
        for (int i = 1; i <= ports; i++) {
            discoverFakePorts.add(new V1ServicePort().name(discoverPort + i).port(start + i));
        }
        return discoverFakePorts;
    }

}
